# Copyright (c) 2019 Red Hat, Inc. All rights reserved. This copyrighted
# material is made available to anyone wishing to use, modify, copy, or
# redistribute it subject to the terms and conditions of the GNU General Public
# License v.2 or later.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Execution of tests from the database."""

from enum import Enum
from functools import reduce
import re
from textwrap import dedent

import jinja2
from lxml import etree
from lxml import isoschematron

from kpet import data
from kpet import host_condition
from kpet import misc

# TODO: Make the module shorter, but meanwhile, pylint: disable=too-many-lines


class Test:     # pylint: disable=too-few-public-methods
    """A test run."""

    # make kpet.data.Test attributes accessible without having to use `.data`

    def __getattr__(self, attr):
        """Get atribute."""
        return getattr(self.data, attr)

    def __init__(self, test_data, waived,
                 targeted_sources, triggered_sources):
        """
        Initialize a test run.

        Args:
            test_data:          The executed test's data (a kpet.data.Test
                                instance).
            waived:             True if the test result is waived (should be
                                omitted from the summary). False otherwise.
            targeted_sources:   The set of source files this test run is
                                targeting, or None if the source files are
                                unknown.
            triggered_sources:  The set of source files (targeted or not) that
                                triggered this test run, or None if the source
                                files are unknown.
        """
        assert isinstance(test_data, data.Test)
        assert isinstance(waived, bool)
        assert targeted_sources is None or (
            isinstance(targeted_sources, set) and
            all(isinstance(s, str) for s in targeted_sources) and
            test_data.select_targeted_sources(targeted_sources) ==
            targeted_sources
        )
        assert triggered_sources is None or (
            isinstance(triggered_sources, set) and
            all(isinstance(s, str) for s in triggered_sources) and
            triggered_sources >= targeted_sources and
            test_data.select_triggered_sources(triggered_sources) ==
            triggered_sources
        )
        self.data = test_data
        self.waived = waived
        self.targeted_sources = targeted_sources
        self.triggered_sources = triggered_sources


class BasicHost:
    # pylint: disable=too-few-public-methods, too-many-instance-attributes
    """A basic host running tests."""

    def __init__(self, basic_host_data, tests):
        """
        Initialize a basic host run.

        Args:
            basic_host_data:    Basic host data.
            tests:              A list of tests (kpet.run.Test instances).
        """
        assert isinstance(basic_host_data, data.BasicHost)
        assert isinstance(tests, list)
        assert all(isinstance(test, Test) for test in tests)

        self.description = basic_host_data.description
        self.ignore_panic = basic_host_data.ignore_panic
        self.preboot_tasks = basic_host_data.preboot_tasks
        self.postboot_tasks = basic_host_data.postboot_tasks
        self.kernel_options = basic_host_data.kernel_options
        self.kernel_options_post = basic_host_data.kernel_options_post
        self.tests = tests

        # Collect all unique cases referenced by tests,
        # in per-test top-bottom series
        self.cases = []
        for test in tests:
            pos = len(self.cases)
            case = test.case
            while case is not None:
                if case not in self.cases:
                    self.cases.insert(pos, case)
                case = case.parent

        # Assemble partitions and kickstart lists
        partitions_list = [basic_host_data.partitions]
        kickstart_list = [basic_host_data.kickstart]
        for case in self.cases:
            partitions_list.append(case.partitions)
            kickstart_list.append(case.kickstart)

        # Remove undefined template paths
        def list_without_none(iterable):
            return list(filter(lambda e: e is not None, iterable))
        self.partitions_list = list_without_none(partitions_list)
        self.kickstart_list = list_without_none(kickstart_list)

        # Put waived tests at the end
        self.tests.sort(key=lambda t: t.waived)


class Guest(BasicHost):
    # pylint: disable=too-few-public-methods, too-many-instance-attributes
    """A guest (VM) running tests."""

    def __init__(self, guest_data, tests):
        """
        Initialize a host run.

        Args:
            guest_data: Guest data.
            tests:      A list of tests (kpet.run.Test instances).
        """
        assert isinstance(guest_data, data.Guest)
        assert isinstance(tests, list)
        assert all(isinstance(test, Test) for test in tests)

        super().__init__(guest_data, tests)
        self.args = guest_data.args


class Host(BasicHost):
    # pylint: disable=too-few-public-methods, too-many-instance-attributes
    """A host running tests."""

    def __init__(self, host_type_name, host_type_data, condition, host_tests):
        """
        Initialize a host run.

        Args:
            host_type_name:     Name of the host type.
            host_type_data:     Host type data.
            condition:          A host condition expression.
            host_tests:         A list of tests (kpet.run.Test instances) to
                                run on the host, or a dictionary of guest
                                names and lists of tests to run on them, with
                                the empty name corresponding to the host
                                itself.
        """
        assert isinstance(host_type_name, str)
        assert isinstance(host_type_data, data.HostType)
        assert host_condition.is_valid(condition)
        if isinstance(host_tests, list):
            host_tests = {"": host_tests}
        assert isinstance(host_tests, dict)
        assert all(
            isinstance(host_name, str) and
            (host_name == "" or host_name in host_type_data.guests) and
            isinstance(tests, list) and
            all(isinstance(test, Test) for test in tests)
            for host_name, tests in host_tests.items()
        )

        super().__init__(host_type_data, host_tests.get("", []))

        self.type_name = host_type_name
        # TODO: Remove once users switch to self.description
        self.type_description = host_type_data.description
        self.hostname = host_type_data.hostname

        # Assemble host_requires list
        host_requires_list = [host_type_data.host_requires]
        for case in self.cases:
            host_requires_list.append(case.host_requires)

        # Remove undefined template paths
        host_requires_list = [e for e in host_requires_list if e is not None]

        # Generate minimized and normalized (stable) host condition
        condition = [condition]
        if self.hostname is not None:
            condition.append("H:" + self.hostname)
        condition += ["T:" + t for t in host_requires_list]
        self.condition = host_condition.minimize(condition)

        # Create guests
        self.guests = {}
        for guest_name, guest_data in host_type_data.guests.items():
            guest_tests = host_tests.get(guest_name, [])
            if guest_tests:
                self.guests[guest_name] = Guest(guest_data, guest_tests)


class HighCostCondition(Enum):
    """Condition for inclusion of high-cost tests."""

    # Never
    NO = 0
    FALSE = 0
    # When "targeted"
    TARGETED = 1
    # When "targeted" or "triggered"
    TRIGGERED = 2
    # Always
    YES = 3
    TRUE = 3


class UnavailableComponents(Exception):
    """Specified components are unavailable."""


# It's complicated,
# pylint: disable=too-few-public-methods, too-many-instance-attributes
class Scene:
    """A single run of tests in a database."""

    @staticmethod
    def _filter_domains(database, paths, arch):
        """
        Filter the specified domains.

        Filter the specified domains contained in a database, according to an
        optional architecture.

        Args:
            database:   The database containing domains to be filtered.
            paths:      The list of paths of domains to filter.
            arch:       The architecture to filter domains by, or None, if
                        domains shouldn't be filtered by architecture.

        Returns:
            A new dictionary containing filtered domains indexed by paths, or
            None if the database had no domains defined.
        """
        assert isinstance(database, data.Base)
        assert database.all_domains is None or \
            (
                isinstance(paths, list) and
                all(path in database.all_domains for path in paths)
            )
        assert arch is None or arch in database.arches
        if database.all_domains is None:
            return None
        domains = {}
        for domain in database.all_domains.values():
            # Skip if the architecture doesn't match
            if arch is not None and arch not in domain.arches:
                continue
            # If the domain's path is not directly selected
            if domain.path not in paths:
                # Check if a domain's parent's path is selected
                domain_parent = domain.parent
                while domain_parent:
                    if domain_parent.path in paths:
                        break
                    domain_parent = domain_parent.parent
                else:
                    continue
            # Add the domain
            domains[domain.path] = domain
        return domains

    @staticmethod
    def _filter_host_types(database, target):
        """
        Filter a dictionary of host types.

        Filter a dictionary of host types from a database, according to
        the target.

        Args:
            database:   The database containing the host types to filter.
            target:     The target to filter host types by.

        Returns:
            A new dictionary containing the filtered host types.
        """
        assert isinstance(database, data.Base)
        assert isinstance(target, data.Target)
        return {
            name: host_type
            for name, host_type in database.host_types.items()
            if host_type.supported.matches(target)
        }

    # We have a lot of conditions,
    # pylint: disable=too-many-arguments,too-many-branches
    @staticmethod
    def _filter_and_create_tests(*, database, target, sources,
                                 triggered, targeted, high_cost,
                                 match_sets, name_regexes):
        """
        Filter and create tests.

        Filter tests from a database according to multiple parameters, and
        create test runs for them (kpet.run.Test instances).

        Args:
            database:       The database containing the tests to filter and
                            create runs for.
            target:         The target to filter tests, and waive runs by.
            sources:        A set of source file paths the tests should cover,
                            if applicable, or None, if unknown.
            triggered:      True if only triggered tests should be included.
                            False if triggered tests should not be included.
                            None triggered status of tests should be ignored.
            targeted:       True if only targeted tests should be included.
                            False if targeted tests should not be included.
                            None targeted status of tests should be ignored.
            high_cost:      The condition for inclusion of high-cost tests
                            (a HighCostCondition enum member).
            match_sets:     A function matching sets a test belongs to and
                            returning True, if it should be included in the
                            filtering, and False, if not. Must accept a set of
                            strings (names of sets the test belongs to) as the
                            only argument.
            name_regexes:   A list of string regexes to match test names
                            against. Tests that match any regex will be
                            included in the filtering.

        Returns:
            A list of test runs created for the filtered tests, and waived
            according to the target.
        """
        assert isinstance(database, data.Base)
        assert isinstance(target, data.Target)
        assert sources is None or \
            (isinstance(sources, set) and
             all(isinstance(s, str) for s in sources) and
             database.select_sources(sources) == sources)
        assert triggered is None or isinstance(triggered, bool)
        assert targeted is None or isinstance(targeted, bool)
        assert isinstance(high_cost, HighCostCondition)
        assert callable(match_sets)
        assert isinstance(name_regexes, list) and \
            all(isinstance(regex, str) for regex in name_regexes)
        tests = []
        for name, test in database.tests.items():
            if not any(re.fullmatch(regex, name) for regex in name_regexes):
                continue
            if not match_sets(test.sets):
                continue
            if not test.is_enabled_for(target):
                continue
            targeted_sources = test.select_targeted_sources(sources)
            triggered_sources = test.select_triggered_sources(sources)
            if sources is not None and not test.has_extra_coverage() and \
               not triggered_sources:
                continue
            if triggered is not None and \
               triggered != bool(triggered_sources) or \
               targeted is not None and \
               targeted != bool(targeted_sources):
                continue
            if test.high_cost:
                if high_cost == HighCostCondition.NO:
                    continue
                if high_cost == HighCostCondition.TARGETED:
                    if not test.has_extra_coverage() and \
                       not targeted_sources:
                        continue
                elif high_cost == HighCostCondition.TRIGGERED:
                    if not test.has_extra_coverage() and \
                       not triggered_sources:
                        continue
            tests.append(Test(
                test, test.is_waived_for(target),
                targeted_sources, triggered_sources
            ))
        return tests

    @staticmethod
    def _distribute_host_params(domains, host_types, global_condition):
        """
        Distribute host types to domains.

        Distribute host types to domains, and create parameters for
        instantiated hosts.

        Args:
            domains:            A path-indexed dictionary of domains to
                                distribute host types to, or None, if no
                                domains are defined.
            host_types:         A name-indexed dictionary of host types to
                                distribute.
            global_condition:   A global condition to apply to all hosts.

        Returns:
            A list of dictionaries containing parameters to be passed to Host
            constructors for instantiated hosts, with test lists empty.
        """
        hosts = []
        for host_type_name, host_type in host_types.items():
            # Start with the global condition
            condition = [global_condition]
            # If domains are defined
            if domains is not None:
                # Find first most-generic domain the host belongs to
                min_spec = None
                min_spec_domain = None
                # For each domain
                for domain in domains.values():
                    # If host type targets the domain or one of its parents
                    container = domain
                    while container:
                        if container.path in host_type.domains:
                            break
                        container = container.parent
                    else:
                        continue
                    # Remember the domain, if it's less specific
                    # (more generic) than others so far
                    spec = domain.get_specificity()
                    if min_spec is None or spec < min_spec:
                        min_spec = spec
                        min_spec_domain = domain
                # If the host type has no domain
                if min_spec_domain is None:
                    continue
                # Build host condition from the domain's branch
                domain = min_spec_domain
                while domain:
                    condition.append(domain.host_condition)
                    domain = domain.parent

            assert host_condition.is_valid(condition)
            # Store host parameters
            hosts.append({
                'host_type_name': host_type_name,
                'host_type_data': host_type,
                'condition': condition,
                'host_tests': {
                    host_name: []
                    for host_name in ("",) + tuple(host_type.guests)
                }
            })
        return hosts

    # We know this isn't great,
    # pylint: disable=too-many-arguments, too-many-locals, too-many-branches
    # pylint: disable=too-many-statements
    def __init__(self, *, database, domain_paths, target, files, triggered,
                 targeted, high_cost, match_sets, test_regexes, kernel):
        """
        Initialize a test execution scene.

        Args:
            database:       The database to get test data from.
            domain_paths:   A list of paths of host domains to run tests on.
                            Each must be present in the database. No effect,
                            if the database has no domains defined.
            target:         A test execution target (a kpet.data.Target
                            instance) to match/filter tests with.
                            Each target's tree, architecture, and component
                            must be present in the database. Both the target's
                            tree and architecture must be known (not None),
                            for the scene (its tests) to be executable.
            files:          A set of file paths including the sources the
                            tests should cover, if applicable, or None, if
                            unknown.
            triggered:      True if only triggered tests should be executed.
                            False if triggered tests should not be executed.
                            None triggered status of tests should be ignored.
            targeted:       True if only targeted tests should be executed.
                            False if targeted tests should not be executed.
                            None targeted status of tests should be ignored.
            high_cost:      The condition for inclusion of high-cost tests
                            (a HighCostCondition enum member).
            match_sets:     A function matching sets a test belongs to and
                            returning True, if it should be included in the
                            filtering, and False, if not. Must accept a set of
                            strings (names of sets the test belongs to) as the
                            only argument. Can be None to match any set
                            membership.
            test_regexes:   A list of string regexes to match
                            test names against.
                            Tests that match any regex will be included in the
                            filtering. None if all tests should be included.
            kernel:         An (arbitrary) string representing the kernel
                            location, or None, meaning no kernel was
                            specified.
        """
        assert isinstance(database, data.Base)
        assert database.domains is None or \
            (
                isinstance(domain_paths, list) and
                all(path in database.all_domains for path in domain_paths)
            )
        assert isinstance(target, data.Target)
        assert target.trees is None or \
            isinstance(target.trees, set) and \
            target.trees <= set(database.trees)
        assert target.arches is None or \
            isinstance(target.arches, set) and \
            target.arches <= set(database.arches)
        unavailable_components = sorted(
            name
            for name in target.components or set()
            if not database.components[name].supported.matches(target)
        )
        if unavailable_components:
            raise UnavailableComponents(
                f"Components {unavailable_components!r} "
                f"are not available with "
                f"specified trees, architectures and components"
            )
        assert files is None or \
            (isinstance(files, set) and
             all(isinstance(f, str) for f in files))
        assert triggered is None or isinstance(triggered, bool)
        assert targeted is None or isinstance(targeted, bool)
        assert isinstance(high_cost, HighCostCondition)
        assert match_sets is None or callable(match_sets)
        assert test_regexes is None or \
            (isinstance(test_regexes, list) and
             all(isinstance(regex, str) for regex in test_regexes))
        assert kernel is None or isinstance(kernel, str)

        self.kernel = kernel
        self.arch = None if target.arches is None else tuple(target.arches)[0]
        self.tree = None if target.trees is None else tuple(target.trees)[0]
        self.components = None if target.components is None \
            else sorted(target.components)

        # Filter domains, if any
        domains = Scene._filter_domains(database, domain_paths, self.arch)
        # Filter host types
        host_types = Scene._filter_host_types(database, target)
        # Find sources in files
        sources = database.select_sources(files)
        # Create test runs for tests filtered by the parameters
        tests = Scene._filter_and_create_tests(
            database=database,
            target=target,
            sources=sources,
            triggered=triggered,
            targeted=targeted,
            high_cost=high_cost,
            match_sets=match_sets or (lambda _: True),
            name_regexes=[".*"] if test_regexes is None else test_regexes
        )
        # Build the global host condition
        global_host_condition = [
            "T:" + database.components[name].host_requires
            for name in ((target.components or set()) &
                         set(database.components))
            if database.components[name].host_requires
        ]
        # Collect host parameters, distributing host types to domains
        hosts = Scene._distribute_host_params(domains, host_types,
                                              global_host_condition)

        # Distribute tests to hosts
        tests_with_hosts = []
        for test in tests:
            # Assign the test to the first host with matching type, if any
            for host in hosts:
                if not test.host_types or \
                   test.host_types.fullmatch(host['host_type_name']):
                    host['host_tests'][""].append(test)
                    tests_with_hosts.append(test)
                    break
                for guest_name in host['host_type_data'].guests:
                    if test.host_types.fullmatch(
                        host['host_type_name'] + "/" + guest_name
                    ):
                        host['host_tests'][guest_name].append(test)
                        tests_with_hosts.append(test)
                        break
                else:
                    continue
                break

        # Remove tests without hosts
        tests = tests_with_hosts

        # Remove hosts without tests
        hosts = [
            host for host in hosts
            if any(tests for tests in host['host_tests'].values())
        ]

        # Expose sources we found
        self.sources = sources

        # Expose data for the executed tests
        self.tests = {t.data for t in tests}

        # Build sources->targeted test data map
        self.sources_targeted_tests = None if sources is None else {
            s: set(t.data for t in tests if s in t.targeted_sources)
            for s in sources
        }

        # Build test data->targeted sources map
        self.tests_targeted_sources = {
            t.data: t.targeted_sources for t in tests
        }

        # Build sources->triggered test data map
        self.sources_triggered_tests = None if sources is None else {
            s: set(t.data for t in tests if s in t.triggered_sources)
            for s in sources
        }

        # Build test data->triggered sources map
        self.tests_triggered_sources = {
            t.data: t.triggered_sources for t in tests
        }

        # Distribute hosts to explicitly-defined recipesets
        self.recipesets = []
        # For each list of host type names in every recipeset def
        for recipeset_host_type_names in database.recipesets.values():
            # Collect hosts for the recipeset
            recipeset = []
            # For each host type name in the recipeset def
            for recipeset_host_type_name in recipeset_host_type_names:
                # Find the first matching host
                for host_idx, host in enumerate(hosts):
                    if host['host_type_name'] == recipeset_host_type_name:
                        # Consume the host
                        recipeset.append(Host(**hosts.pop(host_idx)))
                        break
            # If the recipeset got any hosts
            if recipeset:
                # Sort hosts by types for output stability
                recipeset.sort(key=lambda h: h.type_name)
                # Add the recipeset
                self.recipesets.append(recipeset)

        # Put remaining hosts into dedicated implicit recipesets
        for host in hosts:
            self.recipesets.append([Host(**host)])

    def is_executable(self):
        """
        Check if the scene is executable.

        Check if it has all the necessary data for its tests to run.

        Returns:
            True if the scene is executable. False otherwise.
        """
        return not (
            self.arch is None or self.tree is None
        )


class InvalidAssignments(Exception):
    """Variable assignments are invalid."""


class InvalidSchema(Exception):
    """Output does not validate against a schema."""


class Scenario:
    """A collection of scenes (runs of tests in a database)."""

    def __init__(self, database):
        """
        Initialize a test execution scenario.

        Args:
            database:   The database to use as the data source for test
                        execution scenes, and for generating the run XML.
        """
        assert isinstance(database, data.Base)
        self.database = database
        self.scenes = []

    # pylint: disable=too-many-arguments
    def add_scene(self, *, domain_paths, target, files=None,
                  triggered=None, targeted=None,
                  high_cost=HighCostCondition.YES,
                  match_sets=None, test_regexes=None, kernel=None):
        """
        Add a new scene to the scenario.

        Add a new scene to the scenario using the database the scenario was
        created with and the specified scene parameters.

        Args:
            domain_paths:   A list of paths of host domains to run tests on.
                            Each must be present in the database. No effect,
                            if the database has no domains defined.
            target:         A test execution target (a kpet.data.Target
                            instance) to match/filter tests with.
                            Each target's tree, architecture, and component
                            must be present in the database. Both the target's
                            tree and architecture must be known (not None),
                            for the scene (its tests) to be executable.
            files:          A set of file paths including the sources the
                            tests should cover, if applicable, or None, if
                            unknown. Default is None.
            triggered:      True if only triggered tests should be executed.
                            False if triggered tests should not be executed.
                            None triggered status of tests should be ignored.
            targeted:       True if only targeted tests should be executed.
                            False if targeted tests should not be executed.
                            None targeted status of tests should be ignored.
            high_cost:      The condition for inclusion of high-cost tests
                            (a HighCostCondition enum member). Default is
                            HighCostCondition.YES.
            match_sets:     A function matching sets a test belongs to and
                            returning True, if it should be included in the
                            filtering, and False, if not. Must accept a set of
                            strings (names of sets the test belongs to) as the
                            only argument. Can be None to match any set
                            membership. Default is None.
            test_regexes:   A list of string regexes to match
                            test names against.
                            Tests that match any regex will be included in the
                            filtering. None if all tests should be included.
                            Default is None.
            kernel:         An (arbitrary) string representing the kernel
                            location, or None, meaning no kernel was
                            specified. Default is None.
        """
        assert self.database.domains is None or \
            (
                isinstance(domain_paths, list) and
                all(path in self.database.all_domains for path in domain_paths)
            )
        assert isinstance(target, data.Target)
        assert target.trees is None or \
            isinstance(target.trees, set) and \
            target.trees <= set(self.database.trees)
        assert target.arches is None or \
            isinstance(target.arches, set) and \
            target.arches <= set(self.database.arches)
        assert target.components is None or \
            target.components <= set(self.database.components)
        assert files is None or \
            (isinstance(files, set) and
             all(isinstance(f, str) for f in files))
        assert triggered is None or isinstance(triggered, bool)
        assert targeted is None or isinstance(targeted, bool)
        assert isinstance(high_cost, HighCostCondition)
        assert match_sets is None or callable(match_sets)
        assert test_regexes is None or \
            (isinstance(test_regexes, list) and
             all(isinstance(regex, str) for regex in test_regexes))
        assert kernel is None or isinstance(kernel, str)
        self.scenes.append(Scene(
            database=self.database,
            domain_paths=domain_paths,
            target=target,
            files=files,
            triggered=triggered,
            targeted=targeted,
            high_cost=high_cost,
            match_sets=match_sets,
            test_regexes=test_regexes,
            kernel=kernel
        ))

    def is_executable(self):
        """
        Check if the scenario is executable.

        Check if all its scenes are executable, i.e. have enough data for
        tests to run.

        Returns:
            True if the scenario is executable. False otherwise.
        """
        return all(scene.is_executable() for scene in self.scenes)

    def variables_are_valid(self, variables):
        """
        Check that a dictionary of template variables is valid.

        Check that a dictionary of template variables contains values for all
        variables described in the database, and they're of the correct type.

        Args:
            variables:  The dictionary of template variables to check.

        Returns:
            True if the variables are valid, false otherwise.
        """
        return \
            isinstance(variables, dict) and \
            set(variables) == set(self.database.variables.keys()) and \
            all(
                isinstance(variables[name], variable_def['type'])
                for name, variable_def in self.database.variables.items()
            )

    def assignments_parse(self, assignments):
        """
        Parse a list of template variable assignments.

        Parse a list of template variable assignments into a dictionary of
        variables, with defaults applied, and value types and requirements
        checked against variable definitions in the database.

        Args:
            assignments:    A list of strings, each containing a variable
                            name, followed by an equals character ('='),
                            and then the variable's value.

        Returns:
            A dictionary of parsed variables.

        Raises:
            InvalidAssignments if an assignment syntax is invalid, a variable
            name is unknown, or a required value is missing, or has invalid
            type.
        """
        # Parse variable assignments
        variables = {n: v['default']
                     for n, v in self.database.variables.items()
                     if 'default' in v}
        for assignment in assignments:
            match = re.fullmatch("([^=]+)=(.*)", assignment)
            if match is None:
                raise InvalidAssignments(
                    f"Invalid syntax: \"{assignment}\""
                )
            name = match.group(1)
            if name not in self.database.variables:
                raise InvalidAssignments(f"Unknown variable: {name!r}")
            value_string = match.group(2)
            value = self.database.variable_value_parse(
                self.database.variables[name], value_string
            )
            if value is None:
                raise InvalidAssignments(
                    f"Invalid value {value_string!r} for variable {name!r} "
                    f"of type "
                    f"{self.database.variables[name]['type'].__name__!r}"
                )
            variables[name] = value

        # Check if all variables without defaults are set
        unset_quoted_names = []
        for name, variable_def in self.database.variables.items():
            if 'default' not in variable_def and name not in variables:
                unset_quoted_names.append(repr(name))
        if unset_quoted_names:
            raise InvalidAssignments(
                "Required variables not set: " + ", ".join(unset_quoted_names)
            )

        assert self.variables_are_valid(variables)
        return variables

    def generate(self, description, lint, variables):
        """
        Generate an XML document.

        Generate an XML document describing the hosts and tests involved in
        executing the scenario. The scenario must be executable for this to
        succeed.

        Args:
            description:    The run description string.
            lint:           Lint and reformat the resulting XML, if True.
            variables:      A dictionary of extra template variables.
        Returns:
            A string containing the generated XML document.
        """
        assert isinstance(description, str)
        assert isinstance(lint, bool)
        assert self.variables_are_valid(variables)
        assert self.is_executable()

        template_path = self.database.template
        if template_path is None:
            return ""

        params = {
            'DESCRIPTION': description,
            'SCENES': self.scenes,
            'VARIABLES': variables,
        }

        jinja_env = jinja2.Environment(
            loader=jinja2.FileSystemLoader([self.database.dir_path]),
            trim_blocks=True,
            keep_trailing_newline=True,
            lstrip_blocks=True,
            autoescape=jinja2.select_autoescape(
                enabled_extensions=('xml'),
                default_for_string=True,
            ),
            undefined=jinja2.StrictUndefined,
        )
        # TODO: Remove once users switched to "unindent"
        jinja_env.filters['dedent'] = dedent
        jinja_env.filters['unindent'] = misc.unindent
        template = jinja_env.get_template(template_path)
        text = template.render(params)

        if lint:
            parser = etree.XMLParser(remove_blank_text=True)
            tree = etree.XML(text.encode("utf-8"), parser)
            for schema_path in self.database.schemas:
                try:
                    # NOTE: data.Base makes sure valid extensions are there
                    schema = {
                        'xsd': etree.XMLSchema,
                        'rng': etree.RelaxNG,
                        'sch': isoschematron.Schematron,
                    }[schema_path.split('.')[-1]](etree.parse(schema_path))
                except (etree.LxmlError, OSError) as exc:
                    raise Exception(
                        f"Failed loading schema {schema_path!r}"
                    ) from exc

                if not schema.validate(tree):
                    raise InvalidSchema("\n" + repr(schema.error_log))
            text = etree.tostring(tree, encoding="utf-8",
                                  xml_declaration=True,
                                  pretty_print=True).decode("utf-8")
        return text

    def get_sources(self):
        """
        Get the set of source file paths supplied to the scenario.

        Get the set of paths to source files (subset of file paths) supplied
        to all the scenes in the scenario together, or None, if at least one
        scene was added with unknown files.

        Returns:
            The set of source file paths, or None, if at least one scene
            doesn't know its sources.
        """
        sources = set()
        for scene in self.scenes:
            if scene.sources is None:
                return None
            sources |= scene.sources
        return sources

    def get_tests(self):
        """
        Get a set of test data objects.

        Get a set of data objects (kpet.data.Test instances) of tests executed
        in the scenario.

        Returns:
            The set of data objects for executed tests.
        """
        return reduce(
            lambda x, y: x | y,
            (scene.tests for scene in self.scenes)
        )

    def get_sources_targeted_tests(self):
        """
        Get a dictionary of sources and tests targeting them.

        Get a dictionary of sources and sets of data for the tests
        (kpet.data.Test objects) that target them, or None if sources are
        unknown.

        Returns:
            A dictionary with source file paths as keys and sets of
            kpet.data.Test objects as values, or None if the scenario contains
            at least one scene with unknown sources.
        """
        sources_targeted_tests = {}
        for scene in self.scenes:
            if scene.sources_targeted_tests is None:
                return None
            for source, tests in scene.sources_targeted_tests.items():
                if source in sources_targeted_tests:
                    sources_targeted_tests[source] |= tests
                else:
                    sources_targeted_tests[source] = tests.copy()
        return sources_targeted_tests

    def get_tests_targeted_sources(self):
        """
        Get a dictionary of tests and sources targeted by them.

        Get a dictionary of test data (kpet.data.Test objects) and sets of
        sources that they target (or None values if sources are unknown).

        Returns:
            A dictionary with kpet.data.Test objects as keys, and sets of
            source file paths as values; or None as values, if the scenario
            has at least one scene with unknown sources.
        """
        tests_targeted_sources = {}
        for scene in self.scenes:
            for test, sources in scene.tests_targeted_sources.items():
                if sources is None:
                    return {
                        t: None
                        for s in self.scenes
                        for t in s.tests_targeted_sources
                    }
                if test in tests_targeted_sources:
                    tests_targeted_sources[test] |= sources
                else:
                    tests_targeted_sources[test] = sources.copy()
        return tests_targeted_sources

    def get_sources_triggered_tests(self):
        """
        Get a dictionary of sources and tests triggered by them.

        Get a dictionary of sources and sets of data for the tests
        (kpet.data.Test objects) that where triggered by them, or None if
        sources are unknown.

        Returns:
            A dictionary with source file paths as keys and sets of
            kpet.data.Test objects as values, or None if the scenario contains
            at least one scene with unknown sources.
        """
        sources_triggered_tests = {}
        for scene in self.scenes:
            if scene.sources_triggered_tests is None:
                return None
            for source, tests in scene.sources_triggered_tests.items():
                if source in sources_triggered_tests:
                    sources_triggered_tests[source] |= tests
                else:
                    sources_triggered_tests[source] = tests.copy()
        return sources_triggered_tests

    def get_tests_triggered_sources(self):
        """
        Get a dictionary of tests and sources which triggered them.

        Get a dictionary of test data (kpet.data.Test objects) and sets of
        sources that triggered them (or None values if sources are unknown).

        Returns:
            A dictionary with kpet.data.Test objects as keys, and sets of
            source file paths as values; or None as values, if the scenario
            has at least one scene with unknown sources.
        """
        tests_triggered_sources = {}
        for scene in self.scenes:
            for test, sources in scene.tests_triggered_sources.items():
                if sources is None:
                    return {
                        t: None
                        for s in self.scenes
                        for t in s.tests_triggered_sources
                    }
                if test in tests_triggered_sources:
                    tests_triggered_sources[test] |= sources
                else:
                    tests_triggered_sources[test] = sources.copy()
        return tests_triggered_sources
